热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

提案|攻击者_Tendermint共识分析

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Tendermint共识分析相关的知识,希望对你有一定的参考价值。概述Tendermint

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Tendermint 共识分析相关的知识,希望对你有一定的参考价值。



概述

Tendermint的共识算法可以看成是POS+BFT,Tendermint在进行BFT共识算法确认区块前 ,首先使用POS算法从Validators中选举出Proposer。然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。

其中使节点成为Validator有两种方法,具体可参考:https://docs.tendermint.com/master/nodes/validators.html


round-robin

从Validators中选举出proposer需要使用round-robin协议,这篇文章很好的解释了round-robin协议:https://zhuanlan.zhihu.com/p/84962067


round-based

在同一高度确认一个区块需要使用round-based协议,包括以下五个步骤:NewHeight, Propose, Prevote, Precommit 和 Commit

其中Propose、Prevote、Precommit又被称为round,在同一高度确认一个区块可能需要多个round。以下情况就会需要多个round:


  1. 指定的proposer节点不在线
  2. 由proposer提交的区块时无效的
  3. 被提案的区块没有及时的广播
  4. proposal block有效,但是没有足够多的节点在Precommit 阶段及时收到对应的 +2/3 的prevotes
  5. proposal block有效,也有足够多的节点接收到了+2/3 的prevotes,但是没有足够多的节点收到+2/3 的 precommits

round-based过程如下:

+-------------------------------------+
v |(Wait til `CommmitTime+timeoutCommit`)
+-----------+ +-----+-----+
+----------> | Propose +--------------+ | NewHeight |
| +-----------+ | +-----------+
| | ^
|(Else, after timeoutPrecommit) v |
+-----+-----+ +-----------+ |
| Precommit | <------------------------&#43; Prevote | |
&#43;-----&#43;-----&#43; &#43;-----------&#43; |
|(When &#43;2/3 Precommits for block found) |
v |
&#43;--------------------------------------------------------------------&#43;
| Commit |
| |
| * Set CommitTime &#61; now; |
| * Wait for block, then stage/save/commit block; |
&#43;--------------------------------------------------------------------&#43;

Tendermint的共识算法大体流程就是这些&#xff0c;具体的细节将在分析源码的时候进行探讨。

这个文章对共识进行详细描述&#xff0c;并且也解释了重要的锁机制&#xff1a;https://www.odaily.com/post/5134145


源码分析

Tendermint的共识功能主要在tendermint/consensus/state.go文件里进行实现


NewState

func NewState(
config *cfg.ConsensusConfig,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
evpool evidencePool,
options ...StateOption,
) *State
cs :&#61; &State
config: config,
blockExec: blockExec,
blockStore: blockStore,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
statsMsgQueue: make(chan msgInfo, msgQueueSize),
done: make(chan struct),
doWALCatchup: true,
wal: nilWAL,
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),

// 设置一些默认函数&#xff0c;在reactor没有启动前可以被重写
cs.decideProposal &#61; cs.defaultDecideProposal
cs.doPrevote &#61; cs.defaultDoPrevote
cs.setProposal &#61; cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0
cs.reconstructLastCommit(state)

cs.updateToState(state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService &#61; *service.NewBaseService(nil, "State", cs)
for _, option :&#61; range options
option(cs)

return cs


OnStart

Onstart通过WAL加载最新的state&#xff0c;并开启超时和接收消息协程

func (cs *State) OnStart() error
...
...
...
// Double Signing Risk Reduction
if err :&#61; cs.checkDoubleSigningRisk(cs.Height); err !&#61; nil
return err

// 开启接收信息的协程
go cs.receiveRoutine(0)
// schedule the first round!
// use GetRoundState so we don&#39;t race the receiveRoutine for access
cs.scheduleRound0(cs.GetRoundState())
return nil


receiveRoutine

这个函数就比较重要了&#xff0c;它处理了可能导致状态转换的消息。其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。

func (cs *State) receiveRoutine(maxSteps int)
onExit :&#61; func(cs *State)
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven&#39;t hit the WAL, but its ok because
// priv_val tracks LastSig
// close wal now that we&#39;re done writing to it
if err :&#61; cs.wal.Stop(); err !&#61; nil
cs.Logger.Error("failed trying to stop WAL", "error", err)

cs.wal.Wait()
close(cs.done)

defer func()
if r :&#61; recover(); r !&#61; nil
cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
// stop gracefully
//
// NOTE: We most probably shouldn&#39;t be running any further when there is
// some unexpected panic. Some unknown error happened, and so we don&#39;t
// know if that will result in the validator signing an invalid thing. It
// might be worthwhile to explore a mechanism for manual resuming via
// some console or secure RPC system, but for now, halting the chain upon
// unexpected consensus bugs sounds like the better option.
onExit(cs)

()
for
if maxSteps > 0
if cs.nSteps >&#61; maxSteps
cs.Logger.Debug("reached max steps; exiting receive routine")
cs.nSteps &#61; 0
return


rs :&#61; cs.RoundState
var mi msgInfo
select
// 把有效交易添加到交易池的时候会设置TxAvailable
case <-cs.txNotifier.TxsAvailable():
cs.handleTxsAvailable()
// peer消息通道
case mi &#61; <-cs.peerMsgQueue:
if err :&#61; cs.wal.Write(mi); err !&#61; nil
cs.Logger.Error("failed writing to WAL", "err", err)

// 处理 proposal、block parts、votes的消息
cs.handleMsg(mi)
// 处理内部消息
case mi &#61; <-cs.internalMsgQueue:
err :&#61; cs.wal.WriteSync(mi) // NOTE: fsync
if err !&#61; nil
panic(fmt.Sprintf(
"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
mi, err,
))

if _, ok :&#61; mi.Msg.(*VoteMessage); ok
// we actually want to simulate failing during
// the previous WriteSync, but this isn&#39;t easy to do.
// Equivalent would be to fail here and manually remove
// some bytes from the end of the wal.
fail.Fail() // XXX

// handles proposals, block parts, votes
cs.handleMsg(mi)
// 处理超时消息
case ti :&#61; <-cs.timeoutTicker.Chan(): // tockChan:
if err :&#61; cs.wal.Write(ti); err !&#61; nil
cs.Logger.Error("failed writing to WAL", "err", err)

// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit():
onExit(cs)
return



上面的函数运行完毕后&#xff0c;就可以等待进入状态跃迁的函数&#xff0c;进行共识了。
官方的共识流程图表示如下&#xff1a;
单节点共识完整流程的代码流程为&#xff1a;


  1. 首先进入enterNewRound
  2. 之后从enterNewRound进入enterPropose
  3. 进入enterPropose后,判断自己是不是validator.只有一个节点自己就是,进入defaultDecideProposal
  4. 进入defaultDecideProposal,把proposal和blockPartMsg发送到internalMsgQueue
  5. 收到internalMsgQueue的消息,然后进入handleMsg,通过handleMsg进入addProposalBlockPart
  6. 通过addProposalBlockPart 最后进入到enterPrevote
  7. 通过enterPrevote进入到defaultDoPrevote,对proposal进行签名,并发送到internalMsgQueue
  8. handleMsg对收到的消息进行处理,进入到tryAddVote
  9. tryAddVote判断vote正确,并且满足超过三分之二的情况,进入enterPrevoteWait
  10. 计时器超时,从enterPrevoteWait进入到enterPrecommit
  11. 通过enterPrevote对proposal进行再次签名,并发送到internalMsgQueue
  12. handleMsg对收到的消息进行处理,进入到tryAddVote
  13. tryAddVote判断vote正确,进入enterCommit,这里涉及情况比较多(在多个节点的条件下).
  14. enterCommit落地区块,将区块发送给abci,收到返回后,此次共识结束.

由于代码较多&#xff0c;会把相对不太重要的代码给省略掉。这里主要列举对共识流程重要的代码。


enterNewRound

func (cs *State) enterNewRound(height int64, round int32)
logger :&#61; cs.Logger.With("height", height, "round", round)
// 进行状态校验
if cs.Height !&#61; height || round < cs.Round || (cs.Round &#61;&#61; round && cs.Step !&#61; cstypes.RoundStepNewHeight)
logger.Debug(
"entering new round with invalid args",
"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
)
return

// 开启定时器
if now :&#61; tmtime.Now(); cs.StartTime.After(now)
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)

logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
// 如果有新的Validator就添加
validators :&#61; cs.Validators
if cs.Round < round
validators &#61; validators.Copy()
validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))

// 开始一轮新的round
cs.updateRoundStep(round, cstypes.RoundStepNewRound)
cs.Validators &#61; validators
if round &#61;&#61; 0
// We&#39;ve already reset these upon new height,
// and meanwhile we might have received a proposal
// for round 0.
else
logger.Debug("resetting proposal info")
cs.Proposal &#61; nil
cs.ProposalBlock &#61; nil
cs.ProposalBlockParts &#61; nil

cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round&#43;1) to allow round-skipping
cs.TriggeredTimeoutPrecommit &#61; false
if err :&#61; cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err !&#61; nil
cs.Logger.Error("failed publishing new round", "err", err)

cs.metrics.Rounds.Set(float64(round))
// 在我们进入round 0 之前要等待交易在mempool中设置为available&#xff0c;
// 如果最后一个区块改变了app hash我们需要一个空的proof区块&#xff0c;并且立即进入enterProposer函数
waitForTxs :&#61; cs.config.WaitForTxs() && round &#61;&#61; 0 && !cs.needProofBlock(height)
if waitForTxs
if cs.config.CreateEmptyBlocksInterval > 0
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)

else
// 进入enterPropose
cs.enterPropose(height, round)



enterPropose

func (cs *State) enterPropose(height int64, round int32)

...
...
// 节点验证
if cs.privValidatorPubKey &#61;&#61; nil
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
logger.Error("propose step; empty priv validator public key", "err", errPubKeyIsNotSet)
return

address :&#61; cs.privValidatorPubKey.Address()
// if not a validator, we&#39;re done
if !cs.Validators.HasAddress(address)
logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators)
return

// 判断当前节点是否为proposer&#xff0c;如果是的话就开始准备提案
if cs.isProposer(address)
logger.Debug(
"propose step; our turn to propose",
"proposer", address,
)
cs.decideProposal(height, round)
else
logger.Debug(
"propose step; not our turn to propose",
"proposer", cs.Validators.GetProposer().Address,
)



decideProposal

func (cs *State) defaultDecideProposal(height int64, round int32)

...
...

// 创建proposal
propBlockID :&#61; types.BlockIDHash: block.Hash(), PartSetHeader: blockParts.Header()
proposal :&#61; types.NewProposal(height, round, cs.ValidRound, propBlockID)
p :&#61; proposal.ToProto()
// 等待最大数量的proposal
ctx, cancel :&#61; context.WithTimeout(context.TODO(), cs.config.TimeoutPropose)
defer cancel()
// 对proposal进行签名
if err :&#61; cs.privValidator.SignProposal(ctx, cs.state.ChainID, p); err &#61;&#61; nil
proposal.Signature &#61; p.Signature
// 把数据发送到 sendInternalMessage channel中
// 这个channel在receiveRoutine函数启动、等待消息的传入
cs.sendInternalMessage(msgInfo&ProposalMessageproposal, "")
for i :&#61; 0; i < int(blockParts.Total()); i&#43;&#43;
part :&#61; blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo&BlockPartMessagecs.Height, cs.Round, part, "")

cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
else if !cs.replayMode
cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err)



addProposalBlockPart

func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.NodeID) (added bool, err error)

...
...
if cs.Step <&#61; cstypes.RoundStepPropose && cs.isProposalComplete()
// Move onto the next step
cs.enterPrevote(height, cs.Round)
if hasTwoThirds // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)

else if cs.Step &#61;&#61; cstypes.RoundStepCommit
// If we&#39;re waiting on the proposal block...
cs.tryFinalizeCommit(height)

return added, nil

return added, nil


signAddVote

addProposalBlockPart 会进入到enterPrevote&#xff0c;再然后进入到doPrevote&#xff0c;doPrevote的默认函数为doPrevoteproposal&#xff0c;doPrevoteproposal通过调用signAddVote对进行Proposal签名,并发送到internalMsgQueue。

func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash var cpro_id = "u6885494";

推荐阅读
author-avatar
x囚徒已然初年
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有